1   package org.apache.lucene.util;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.DataInputStream;
21  import java.io.IOException;
22  import java.nio.charset.StandardCharsets;
23  import java.nio.file.Files;
24  import java.nio.file.Path;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Comparator;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import org.apache.lucene.util.OfflineSorter.BufferSize;
31  import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter;
32  import org.apache.lucene.util.OfflineSorter.SortInfo;
33  import org.apache.lucene.util.OfflineSorter;
34  
35  /**
36   * Tests for on-disk merge sorting.
37   */
38  public class TestOfflineSorter extends LuceneTestCase {
39    private Path tempDir;
40  
41    @Override
42    public void setUp() throws Exception {
43      super.setUp();
44      tempDir = createTempDir("mergesort");
45    }
46    
47    @Override
48    public void tearDown() throws Exception {
49      if (tempDir != null) {
50        IOUtils.rm(tempDir);
51      }
52      super.tearDown();
53    }
54  
55    public void testEmpty() throws Exception {
56      checkSort(new OfflineSorter(), new byte [][] {});
57    }
58  
59    public void testSingleLine() throws Exception {
60      checkSort(new OfflineSorter(), new byte [][] {
61          "Single line only.".getBytes(StandardCharsets.UTF_8)
62      });
63    }
64  
65    public void testIntermediateMerges() throws Exception {
66      // Sort 20 mb worth of data with 1mb buffer, binary merging.
67      SortInfo info = checkSort(new OfflineSorter(OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.getDefaultTempDir(), 2), 
68          generateRandom((int)OfflineSorter.MB * 20));
69      assertTrue(info.mergeRounds > 10);
70    }
71  
72    public void testSmallRandom() throws Exception {
73      // Sort 20 mb worth of data with 1mb buffer.
74      SortInfo sortInfo = checkSort(new OfflineSorter(OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.getDefaultTempDir(), OfflineSorter.MAX_TEMPFILES), 
75          generateRandom((int)OfflineSorter.MB * 20));
76      assertEquals(1, sortInfo.mergeRounds);
77    }
78  
79    @Nightly
80    public void testLargerRandom() throws Exception {
81      // Sort 100MB worth of data with 15mb buffer.
82      checkSort(new OfflineSorter(OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.getDefaultTempDir(), OfflineSorter.MAX_TEMPFILES), 
83          generateRandom((int)OfflineSorter.MB * 100));
84    }
85  
86    private byte[][] generateRandom(int howMuchDataInBytes) {
87      ArrayList<byte[]> data = new ArrayList<>();
88      while (howMuchDataInBytes > 0) {
89        byte[] current = new byte[random().nextInt(256)];
90        random().nextBytes(current);
91        data.add(current);
92        howMuchDataInBytes -= current.length;
93      }
94      byte [][] bytes = data.toArray(new byte[data.size()][]);
95      return bytes;
96    }
97    
98    static final Comparator<byte[]> unsignedByteOrderComparator = new Comparator<byte[]>() {
99      @Override
100     public int compare(byte[] left, byte[] right) {
101       final int max = Math.min(left.length, right.length);
102       for (int i = 0, j = 0; i < max; i++, j++) {
103         int diff = (left[i]  & 0xff) - (right[j] & 0xff); 
104         if (diff != 0) 
105           return diff;
106       }
107       return left.length - right.length;
108     }
109   };
110 
111   /**
112    * Check sorting data on an instance of {@link OfflineSorter}.
113    */
114   private SortInfo checkSort(OfflineSorter sort, byte[][] data) throws IOException {
115     Path unsorted = writeAll("unsorted", data);
116 
117     Arrays.sort(data, unsignedByteOrderComparator);
118     Path golden = writeAll("golden", data);
119 
120     Path sorted = Files.createTempFile(OfflineSorter.getDefaultTempDir(), "sorted", "");
121     SortInfo sortInfo;
122     try {
123       sortInfo = sort.sort(unsorted, sorted);
124       //System.out.println("Input size [MB]: " + unsorted.length() / (1024 * 1024));
125       //System.out.println(sortInfo);
126       assertFilesIdentical(golden, sorted);
127     } finally {
128       IOUtils.rm(unsorted, golden, sorted);
129     }
130 
131     return sortInfo;
132   }
133 
134   /**
135    * Make sure two files are byte-byte identical.
136    */
137   private void assertFilesIdentical(Path golden, Path sorted) throws IOException {
138     assertEquals(Files.size(golden), Files.size(sorted));
139 
140     byte [] buf1 = new byte [64 * 1024];
141     byte [] buf2 = new byte [64 * 1024];
142     int len;
143     DataInputStream is1 = new DataInputStream(Files.newInputStream(golden));
144     DataInputStream is2 = new DataInputStream(Files.newInputStream(sorted));
145     while ((len = is1.read(buf1)) > 0) {
146       is2.readFully(buf2, 0, len);
147       for (int i = 0; i < len; i++) {
148         assertEquals(buf1[i], buf2[i]);
149       }
150     }
151     IOUtils.close(is1, is2);
152   }
153 
154   private Path writeAll(String name, byte[][] data) throws IOException {
155     Path file = Files.createTempFile(tempDir, name, "");
156     ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(file);
157     for (byte [] datum : data) {
158       w.write(datum);
159     }
160     w.close();
161     return file;
162   }
163   
164   public void testRamBuffer() {
165     int numIters = atLeast(10000);
166     for (int i = 0; i < numIters; i++) {
167       BufferSize.megabytes(1+random().nextInt(2047));
168     }
169     BufferSize.megabytes(2047);
170     BufferSize.megabytes(1);
171     
172     try {
173       BufferSize.megabytes(2048);
174       fail("max mb is 2047");
175     } catch (IllegalArgumentException e) {
176     }
177     
178     try {
179       BufferSize.megabytes(0);
180       fail("min mb is 0.5");
181     } catch (IllegalArgumentException e) {
182     }
183     
184     try {
185       BufferSize.megabytes(-1);
186       fail("min mb is 0.5");
187     } catch (IllegalArgumentException e) {
188     }
189   }
190 
191   public void testThreadSafety() throws Exception {
192     Thread[] threads = new Thread[TestUtil.nextInt(random(), 4, 10)];
193     final AtomicBoolean failed = new AtomicBoolean();
194     final int iters = atLeast(1000);
195     for(int i=0;i<threads.length;i++) {
196       threads[i] = new Thread() {
197           @Override
198           public void run() {
199             try {
200               for(int iter=0;iter<iters && failed.get() == false;iter++) {
201                 checkSort(new OfflineSorter(), generateRandom(1024));
202               }
203             } catch (Throwable th) {
204               failed.set(true);
205               throw new RuntimeException(th);
206             }
207           }
208         };
209       threads[i].start();
210     }
211 
212     for(Thread thread : threads) {
213       thread.join();
214     }
215 
216     assertFalse(failed.get());
217   }
218 }